
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
//import org.apache.flink.table.functions.UserDefinedAggregateFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.call;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.*;
public class GroupWindowFlatAggregate
{

    public static void main(String[] args) throws Exception

    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
                new OrderStream(1,1L, "beer", 3, 1505529000000L), //2017-09-16 10:30:00
                new OrderStream(2,1L, "beer", 3, 1505529000000L), //2017-09-16 10:30:00
                new OrderStream(3,3L, "rubber", 2, 1505527800000L),//2017-09-16 10:10:00
                new OrderStream(4,3L, "rubber", 2, 1505527800000L),//2017-09-16 10:10:00
                new OrderStream(5,1L, "diaper", 4, 1505528400000L),//2017-09-16 10:20:00
                new OrderStream(6,1L, "diaper", 4, 1505528400000L)//2017-09-16 10:20:00
        ));

        Table orders=tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<OrderStream>()
        {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            //            @Nullable
            @Override
            public org.apache.flink.streaming.api.watermark.Watermark getCurrentWatermark()
            {

                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }


            @Override
            public long extractTimestamp(OrderStream element, long recordTimestamp)
            {
                long timestamp = element.rowtime;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

                System.out.println("id:"+element.id+","+"user:"+element.user+"," +
                        "eventtime:["+element.rowtime+"|"+sdf.format(element.rowtime)+"" +
                        "],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+
                        getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        }),$("user"), $("product"), $("amount"), $("rowtime").rowtime());



        System.out.println(orders.getSchema());

        tEnv.registerFunction("top2",new Top2());



        Table result = orders
                .window(Tumble.over(lit(5).minutes())
                        .on($("rowtime"))
                        .as("w")) // define window
                .groupBy($("product"), $("w"),$("user"),$("amount")) // group by key and window
                .flatAggregate(call("top2", $("amount")).as("v", "rank"))
                        .select($("user"),
                                $("w").start(),
                                $("w").end(),
                                $("w").rowtime(),
                                $("v"),
                                $("rank"));

        tEnv.toRetractStream(result, Row.class).print();

        env.execute();


    }

}
